package com.derbysoft.nuke.kafka.manager.infrastructure.zookeeper;

import com.google.common.base.Splitter;
import com.google.common.collect.Maps;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.concurrent.ConcurrentMap;

@Service
public class ZookeeperClientImpl implements ZookeeperClient {

    private ConcurrentMap<String, CuratorFramework> zookeeperClients = Maps.newConcurrentMap();

    @Override
    public CuratorFramework getZookeeperClient(String connectString) {
        CuratorFramework client = zookeeperClients.get(connectString);
        if (client == null) {
            CuratorFramework newClient = newZookeeperClient(connectString);
            client = zookeeperClients.putIfAbsent(connectString, newClient);
            if (client == null) {
                newClient.start();
                client = newClient;
            }
        }
        return client;
    }

    private CuratorFramework newZookeeperClient(String connectString) {
        String namespace = "";
        if (connectString.contains("/")) {
            List<String> strings = Splitter.on("/").splitToList(connectString);
            connectString = strings.get(0);
            namespace = strings.get(1);
        }
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        return CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace(namespace)
                .build();
    }
}
